# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

skip_if_not_available("dataset")

library(dplyr, warn.conflicts = FALSE)

csv_dir <- make_temp_dir()
tsv_dir <- make_temp_dir()

# Data containing a header row
tbl <- df1[, c("int", "dbl")]
header_csv_dir <- make_temp_dir()
headerless_csv_dir <- make_temp_dir()

test_that("Setup (putting data in the dirs)", {
  dir.create(file.path(csv_dir, 5))
  dir.create(file.path(csv_dir, 6))
  write.csv(df1, file.path(csv_dir, 5, "file1.csv"), row.names = FALSE)
  write.csv(df2, file.path(csv_dir, 6, "file2.csv"), row.names = FALSE)
  expect_length(dir(csv_dir, recursive = TRUE), 2)

  # Now, tab-delimited
  dir.create(file.path(tsv_dir, 5))
  dir.create(file.path(tsv_dir, 6))
  write.table(df1, file.path(tsv_dir, 5, "file1.tsv"), row.names = FALSE, sep = "\t")
  write.table(df2, file.path(tsv_dir, 6, "file2.tsv"), row.names = FALSE, sep = "\t")
  expect_length(dir(tsv_dir, recursive = TRUE), 2)

  write.table(tbl, file.path(header_csv_dir, "file1.csv"), sep = ",", row.names = FALSE)
  write.table(tbl, file.path(headerless_csv_dir, "file1.csv"), sep = ",", row.names = FALSE, col.names = FALSE)
})

test_that("CSV dataset", {
  ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
  expect_r6_class(ds$format, "CsvFileFormat")
  expect_r6_class(ds$filesystem, "LocalFileSystem")
  expect_identical(names(ds), c(names(df1), "part"))
  expect_identical(dim(ds), c(20L, 7L))

  expect_equal(
    ds %>%
      select(string = chr, integer = int, part) %>%
      filter(integer > 6 & part == 5) %>%
      collect() %>%
      summarize(mean = mean(as.numeric(integer))), # as.numeric bc they're being parsed as int64
    df1 %>%
      select(string = chr, integer = int) %>%
      filter(integer > 6) %>%
      summarize(mean = mean(integer))
  )
  # Collecting virtual partition column works
  expect_equal(
    collect(ds) %>% arrange(part) %>% pull(part),
    c(rep(5, 10), rep(6, 10))
  )
})

test_that("CSV scan options", {
  options <- FragmentScanOptions$create("text")
  expect_equal(options$type, "csv")
  options <- FragmentScanOptions$create("csv",
    null_values = c("mynull"),
    strings_can_be_null = TRUE
  )
  expect_equal(options$type, "csv")

  dst_dir <- make_temp_dir()
  dst_file <- file.path(dst_dir, "data.csv")
  df <- tibble(chr = c("foo", "mynull"))
  write.csv(df, dst_file, row.names = FALSE, quote = FALSE)

  ds <- open_dataset(dst_dir, format = "csv")
  expect_equal(ds %>% collect(), df)

  sb <- ds$NewScan()
  sb$FragmentScanOptions(options)

  tab <- sb$Finish()$ToTable()
  expect_equal_data_frame(tab, data.frame(chr = c("foo", NA), stringsAsFactors = FALSE))

  # Set default convert options in CsvFileFormat
  csv_format <- CsvFileFormat$create(
    null_values = c("mynull"),
    strings_can_be_null = TRUE
  )
  ds <- open_dataset(dst_dir, format = csv_format)
  expect_equal(ds %>% collect(), tibble(chr = c("foo", NA)))

  # Set both parse and convert options
  df <- tibble(chr = c("foo", "mynull"), chr2 = c("bar", "baz"))
  write.table(df, dst_file, row.names = FALSE, quote = FALSE, sep = "\t")
  ds <- open_dataset(dst_dir,
    format = "csv",
    delimiter = "\t",
    null_values = c("mynull"),
    strings_can_be_null = TRUE
  )
  expect_equal(ds %>% collect(), tibble(
    chr = c("foo", NA),
    chr2 = c("bar", "baz")
  ))
  expect_equal(
    ds %>%
      group_by(chr2) %>%
      summarize(na = all(is.na(chr))) %>%
      arrange(chr2) %>%
      collect(),
    tibble(
      chr2 = c("bar", "baz"),
      na = c(FALSE, TRUE)
    )
  )
})

test_that("compressed CSV dataset", {
  skip_if_not_available("gzip")
  dst_dir <- make_temp_dir()
  dst_file <- file.path(dst_dir, "data.csv.gz")
  write.csv(df1, gzfile(dst_file), row.names = FALSE, quote = FALSE)
  format <- FileFormat$create("csv")
  ds <- open_dataset(dst_dir, format = format)
  expect_r6_class(ds$format, "CsvFileFormat")
  expect_r6_class(ds$filesystem, "LocalFileSystem")

  expect_equal(
    ds %>%
      select(string = chr, integer = int) %>%
      filter(integer > 6 & integer < 11) %>%
      collect() %>%
      summarize(mean = mean(integer)),
    df1 %>%
      select(string = chr, integer = int) %>%
      filter(integer > 6) %>%
      summarize(mean = mean(integer))
  )
})

test_that("CSV dataset options", {
  dst_dir <- make_temp_dir()
  dst_file <- file.path(dst_dir, "data.csv")
  df <- tibble(chr = letters[1:10])
  write.csv(df, dst_file, row.names = FALSE, quote = FALSE)

  format <- FileFormat$create("csv", skip_rows = 1)
  ds <- open_dataset(dst_dir, format = format)

  expect_equal(
    ds %>%
      select(string = a) %>%
      collect(),
    df1[-1, ] %>%
      select(string = chr)
  )

  ds <- open_dataset(dst_dir, format = "csv", column_names = c("foo"))

  expect_equal(
    ds %>%
      select(string = foo) %>%
      collect(),
    tibble(string = c(c("chr"), letters[1:10]))
  )
})

test_that("Other text delimited dataset", {
  ds1 <- open_dataset(tsv_dir, partitioning = "part", format = "tsv")
  expect_equal(
    ds1 %>%
      select(string = chr, integer = int, part) %>%
      filter(integer > 6 & part == 5) %>%
      collect() %>%
      summarize(mean = mean(as.numeric(integer))), # as.numeric bc they're being parsed as int64
    df1 %>%
      select(string = chr, integer = int) %>%
      filter(integer > 6) %>%
      summarize(mean = mean(integer))
  )

  ds2 <- open_dataset(tsv_dir, partitioning = "part", format = "text", delimiter = "\t")
  expect_equal(
    ds2 %>%
      select(string = chr, integer = int, part) %>%
      filter(integer > 6 & part == 5) %>%
      collect() %>%
      summarize(mean = mean(as.numeric(integer))), # as.numeric bc they're being parsed as int64
    df1 %>%
      select(string = chr, integer = int) %>%
      filter(integer > 6) %>%
      summarize(mean = mean(integer))
  )
})

test_that("readr parse options", {
  arrow_opts <- names(formals(csv_parse_options))
  readr_opts <- names(formals(readr_to_csv_parse_options))

  # Arrow and readr parse options must be mutually exclusive, or else the code
  # in `csv_file_format_parse_options()` will error or behave incorrectly. A
  # failure of this test indicates that these two sets of option names are not
  # mutually exclusive.
  expect_equal(
    intersect(arrow_opts, readr_opts),
    character(0)
  )

  # With not yet supported readr parse options
  expect_error(
    open_dataset(tsv_dir, partitioning = "part", delim = "\t", col_select = "integer"),
    "supported"
  )

  # With unrecognized (garbage) parse options
  expect_error(
    open_dataset(
      tsv_dir,
      partitioning = "part",
      format = "text",
      asdfg = "\\"
    ),
    "Unrecognized"
  )

  # With both Arrow and readr parse options (disallowed)
  expect_error(
    open_dataset(
      tsv_dir,
      partitioning = "part",
      format = "text",
      quote = "\"",
      quoting = TRUE
    ),
    "either"
  )

  # With ambiguous partial option names (disallowed)
  expect_error(
    open_dataset(
      tsv_dir,
      partitioning = "part",
      format = "text",
      del = ","
    ),
    "Ambiguous"
  )

  # With only readr parse options (and omitting format = "text")
  ds1 <- open_dataset(tsv_dir, partitioning = "part", delim = "\t")
  expect_equal(
    ds1 %>%
      select(string = chr, integer = int, part) %>%
      filter(integer > 6 & part == 5) %>%
      collect() %>%
      summarize(mean = mean(as.numeric(integer))), # as.numeric bc they're being parsed as int64
    df1 %>%
      select(string = chr, integer = int) %>%
      filter(integer > 6) %>%
      summarize(mean = mean(integer))
  )
})

test_that("Can set null string values", {
  dst_dir <- make_temp_dir()
  df <- tibble(x = c(1, NA, 3))
  write_dataset(df, dst_dir, null_string = "NULL_VALUE", format = "csv")

  csv_contents <- readLines(list.files(dst_dir, full.names = TRUE)[1])
  expect_equal(csv_contents, c("\"x\"", "1", "NULL_VALUE", "3"))

  back <- open_dataset(dst_dir, null_values = "NULL_VALUE", format = "csv") %>% collect()
  expect_equal(df, back)

  # Also works with `na` parameter
  dst_dir <- make_temp_dir()
  write_dataset(df, dst_dir, na = "another_null", format = "csv")

  csv_contents <- readLines(list.files(dst_dir, full.names = TRUE)[1])
  expect_equal(csv_contents, c("\"x\"", "1", "another_null", "3"))

  back <- open_dataset(dst_dir, null_values = "another_null", format = "csv") %>% collect()
  expect_equal(df, back)
})

# see https://issues.apache.org/jira/browse/ARROW-12791
test_that("Error if no format specified and files are not parquet", {
  expect_error(
    open_dataset(csv_dir, partitioning = "part"),
    "Did you mean to specify a 'format' other than the default (parquet)?",
    fixed = TRUE
  )
  expect_error(
    open_dataset(csv_dir, partitioning = "part", format = "parquet"),
    "Parquet magic bytes not found"
  )
})

test_that("Column names can be inferred from schema", {
  # First row must be skipped if file has header
  ds <- open_dataset(
    header_csv_dir,
    format = "csv",
    schema = schema(int = int32(), dbl = float64()),
    skip_rows = 1
  )
  expect_equal(collect(ds), tbl)

  # If first row isn't skipped, supply user-friendly error
  ds <- open_dataset(
    header_csv_dir,
    format = "csv",
    schema = schema(int = int32(), dbl = float64())
  )

  expect_error(
    collect(ds),
    regexp = paste0(
      "If you have supplied a schema and your data contains a ",
      "header row, you should supply the argument `skip = 1` to ",
      "prevent the header being read in as data."
    )
  )

  ds <- open_dataset(
    headerless_csv_dir,
    format = "csv",
    schema = schema(int = int32(), dbl = float64())
  )
  expect_equal(ds %>% collect(), tbl)
})

test_that("Can use col_names readr parameter", {
  expected_names <- c("my_int", "my_double")
  ds <- open_dataset(
    headerless_csv_dir,
    format = "csv",
    col_names = expected_names
  )
  expect_equal(names(ds), expected_names)
  expect_equal(ds %>% collect(), set_names(tbl, expected_names))

  # WITHOUT header, makes up names
  ds <- open_dataset(
    headerless_csv_dir,
    format = "csv",
    col_names = FALSE
  )
  expect_equal(names(ds), c("f0", "f1"))
  expect_equal(ds %>% collect(), set_names(tbl, c("f0", "f1")))

  # WITH header, gets names
  ds <- open_dataset(
    header_csv_dir,
    format = "csv",
    col_names = TRUE
  )
  expect_equal(names(ds), c("int", "dbl"))
  expect_equal(ds %>% collect(), tbl)

  ds <- open_dataset(
    header_csv_dir,
    format = "csv",
    col_names = FALSE,
    skip = 1
  )
  expect_equal(names(ds), c("f0", "f1"))
  expect_equal(ds %>% collect(), set_names(tbl, c("f0", "f1")))

  expect_error(
    open_dataset(headerless_csv_dir, format = "csv", col_names = c("my_int"))
  )
})

test_that("open_dataset() deals with BOMs (byte-order-marks) correctly", {
  temp_dir <- make_temp_dir()
  writeLines("\xef\xbb\xbfa,b\n1,2\n", con = file.path(temp_dir, "file1.csv"))
  writeLines("\xef\xbb\xbfa,b\n3,4\n", con = file.path(temp_dir, "file2.csv"))

  expect_equal(
    open_dataset(temp_dir, format = "csv") %>% collect() %>% arrange(b),
    tibble(a = c(1, 3), b = c(2, 4))
  )
})

test_that("Error if read_options$column_names and schema-names differ (ARROW-14744)", {
  dst_dir <- make_temp_dir()
  dst_file <- file.path(dst_dir, "file.csv")
  df <- df1[, c("int", "dbl")]
  write.csv(df, dst_file, row.names = FALSE, quote = FALSE)

  schema <- schema(int = int32(), dbl = float64())

  # names in column_names but not in schema
  expect_error(
    open_dataset(csv_dir, format = "csv", schema = schema, column_names = c("int", "dbl", "lgl", "chr")),
    "`lgl` and `chr` not present in `schema`"
  )

  # names in schema but not in column_names
  expect_error(
    open_dataset(csv_dir, format = "csv", schema = schema, column_names = c("int")),
    "`dbl` not present in `column_names`"
  )

  # mismatches both ways
  expect_error(
    open_dataset(csv_dir, format = "csv", schema = schema, column_names = c("these", "wont", "match")),
    "`these`, `wont`, and `match` not present in `schema`.*`int` and `dbl` not present in `column_names`"
  )

  # correct names wrong order
  expect_error(
    open_dataset(csv_dir, format = "csv", schema = schema, column_names = c("dbl", "int")),
    "`column_names` and `schema` field names match but are not in the same order"
  )
})

test_that("skip argument in open_dataset", {
  tbl <- df1[, c("int", "dbl")]

  header_csv_dir <- make_temp_dir()
  write.table(tbl, file.path(header_csv_dir, "file1.csv"), sep = ",", row.names = FALSE)

  ds <- open_dataset(
    header_csv_dir,
    format = "csv",
    schema = schema(int = int32(), dbl = float64()),
    skip = 1
  )
  expect_equal(collect(ds), tbl)
})

test_that("error message if non-schema passed in as schema to open_dataset", {
  # passing in the schema function, not an actual schema
  expect_error(
    open_dataset(csv_dir, format = "csv", schema = schema),
    regexp = "`schema` must be an object of class 'Schema' not 'function'.",
    fixed = TRUE
  )
})

test_that("CSV reading/parsing/convert options can be passed in as lists", {
  tf <- tempfile()
  on.exit(unlink(tf))

  writeLines('"x"\n"y"\nNA\nNA\n"NULL"\n\n"foo"\n', tf)

  ds1 <- open_dataset(
    tf,
    format = "csv",
    convert_options = list(null_values = c("NA", "NULL"), strings_can_be_null = TRUE),
    read_options = list(skip_rows = 1L)
  ) %>%
    collect()

  ds2 <- open_dataset(
    tf,
    format = "csv",
    convert_options = csv_convert_options(null_values = c(NA, "NA", "NULL"), strings_can_be_null = TRUE),
    read_options = csv_read_options(skip_rows = 1L)
  ) %>%
    collect()

  expect_equal(ds1, ds2)
})

test_that("open_delim_dataset params passed through to open_dataset", {
  ds <- open_delim_dataset(csv_dir, delim = ",", partitioning = "part")
  expect_r6_class(ds$format, "CsvFileFormat")
  expect_r6_class(ds$filesystem, "LocalFileSystem")
  expect_identical(names(ds), c(names(df1), "part"))
  expect_identical(dim(ds), c(20L, 7L))

  # quote
  dst_dir <- make_temp_dir()
  dst_file <- file.path(dst_dir, "data.csv")

  df <- data.frame(a = c(1, 2), b = c("'abc'", "'def'"))
  write.csv(df, dst_file, row.names = FALSE, quote = FALSE)

  ds_quote <- open_csv_dataset(dst_dir, quote = "'") %>% collect()
  expect_equal(ds_quote$b, c("abc", "def"))

  # na
  ds <- open_csv_dataset(csv_dir, partitioning = "part", na = c("", "NA", "FALSE")) %>% collect()
  expect_identical(ds$lgl, c(
    TRUE, NA, NA, TRUE, NA, TRUE, NA, NA, TRUE, NA, TRUE, NA, NA,
    TRUE, NA, TRUE, NA, NA, TRUE, NA
  ))

  # col_names and skip
  ds <- open_csv_dataset(
    csv_dir,
    partitioning = "part",
    col_names = paste0("col_", 1:6),
    skip = 1
  ) %>% collect()

  expect_named(ds, c("col_1", "col_2", "col_3", "col_4", "col_5", "col_6", "part"))
  expect_equal(nrow(ds), 20)

  # col_types
  dst_dir <- make_temp_dir()
  dst_file <- file.path(dst_dir, "data.csv")

  df <- data.frame(a = c(1, NA, 2), b = c("'abc'", NA, "'def'"))
  write.csv(df, dst_file, row.names = FALSE, quote = FALSE)

  data_schema <- schema(a = string(), b = string())
  ds_strings <- open_csv_dataset(dst_dir, col_types = data_schema)
  expect_equal(ds_strings$schema, schema(a = string(), b = string()))

  # skip_empty_rows
  tf <- tempfile()
  writeLines('"x"\n"y"\nNA\nNA\n"NULL"\n\n\n', tf)

  ds <- open_csv_dataset(tf, skip_empty_rows = FALSE) %>% collect()
  expect_equal(nrow(ds), 7)

  # convert_options
  ds <- open_csv_dataset(
    csv_dir,
    convert_options = list(null_values = c("NA", "", "FALSE"), strings_can_be_null = TRUE)
  ) %>% collect()

  expect_equal(
    ds$lgl,
    c(TRUE, NA, NA, TRUE, NA, TRUE, NA, NA, TRUE, NA, TRUE, NA, NA, TRUE, NA, TRUE, NA, NA, TRUE, NA)
  )

  # read_options
  ds <- open_csv_dataset(
    csv_dir,
    read_options = list(column_names = paste0("col_", 1:6))
  ) %>% collect()

  expect_named(ds, c("col_1", "col_2", "col_3", "col_4", "col_5", "col_6"))

  # schema
  ds <- open_csv_dataset(
    csv_dir,
    schema = schema(
      int = int64(), dbl = int64(), lgl = bool(), chr = utf8(),
      fct = utf8(), ts = timestamp(unit = "s")
    ),
    skip = 1
  ) %>% collect()

  expect_named(ds, c("int", "dbl", "lgl", "chr", "fct", "ts"))

  # quoted_na
  dst_dir <- make_temp_dir()
  dst_file <- file.path(dst_dir, "data.csv")
  writeLines("text,num\none,1\ntwo,2\n,3\nfour,4", dst_file)
  ds <- open_csv_dataset(dst_dir, quoted_na = TRUE) %>% collect()
  expect_equal(ds$text, c("one", "two", NA, "four"))

  ds <- open_csv_dataset(dst_dir, quoted_na = FALSE) %>% collect()
  expect_equal(ds$text, c("one", "two", "", "four"))

  # parse_options
  dst_dir <- make_temp_dir()
  dst_file <- file.path(dst_dir, "data.csv")
  writeLines("x\n\n1\n\n\n2\n\n3", dst_file)
  ds <- open_csv_dataset(
    dst_dir,
    parse_options = csv_parse_options(ignore_empty_lines = FALSE)
  ) %>% collect()
  expect_equal(ds$x, c(NA, 1L, NA, NA, 2L, NA, 3L))

  # timestamp_parsers
  skip("GH-33708: timestamp_parsers don't appear to be working properly")

  dst_dir <- make_temp_dir()
  dst_file <- file.path(dst_dir, "data.csv")

  df <- data.frame(time = "2023-01-16 19:47:57")
  write.csv(df, dst_file, row.names = FALSE, quote = FALSE)

  ds <- open_csv_dataset(dst_dir, timestamp_parsers = c(TimestampParser$create(format = "%d-%m-%y"))) %>% collect()

  expect_equal(ds$time, "16-01-2023")
})

test_that("CSVReadOptions printing", {
  default_read_options <- csv_read_options()
  custom_read_options <- csv_read_options(skip_rows = 102)

  expect_output(print(default_read_options), "skip_rows: 0")
  expect_output(print(custom_read_options), "skip_rows: 102")
})

test_that("CSVReadOptions field access", {
  options <- csv_read_options()
  expect_equal(options$skip_rows, 0)
  expect_equal(options$autogenerate_column_names, FALSE)
  expect_equal(options$skip_rows_after_names, 0)
  expect_equal(options$use_threads, option_use_threads())
  expect_equal(options$column_names, character(0))
  expect_equal(options$block_size, 1048576L)
  expect_equal(options$encoding, "UTF-8")
})

test_that("GH-34640 - CSV datasets are read in correctly when both schema and partitioning supplied", {
  target_schema <- schema(
    int = int32(), dbl = float32(), lgl = bool(), chr = utf8(),
    fct = utf8(), ts = timestamp(unit = "s"), part = int8()
  )

  ds <- open_dataset(
    csv_dir,
    partitioning = schema(part = int32()),
    format = "csv",
    schema = target_schema,
    skip = 1
  )
  expect_r6_class(ds$format, "CsvFileFormat")
  expect_r6_class(ds$filesystem, "LocalFileSystem")
  expect_identical(names(ds), c(names(df1), "part"))
  expect_identical(names(collect(ds)), c(names(df1), "part"))

  expect_identical(dim(ds), c(20L, 7L))
  expect_equal(schema(ds), target_schema)

  expect_equal(
    ds %>%
      select(string = chr, integer = int, part) %>%
      filter(integer > 6 & part == 5) %>%
      collect() %>%
      summarize(mean = mean(as.numeric(integer))),
    df1 %>%
      select(string = chr, integer = int) %>%
      filter(integer > 6) %>%
      summarize(mean = mean(integer))
  )
})

test_that("open_dataset() with `decimal_point` argument", {
  temp_dir <- make_temp_dir()
  writeLines("x\ty\n1,2\tc", con = file.path(temp_dir, "file1.csv"))

  expect_equal(
    open_dataset(temp_dir, format = "tsv") %>% collect(),
    tibble(x = "1,2", y = "c")
  )

  expect_equal(
    open_dataset(temp_dir, format = "tsv", decimal_point = ",") %>% collect(),
    tibble(x = 1.2, y = "c")
  )
})
